package com.learn.sunday.es.compont;

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.learn.sunday.es.bean.UserAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;

@Slf4j
@Component
public class FlinkETLJob implements CommandLineRunner {
    @Override
    public void run(String... args) throws Exception {

        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(2);

        DataStream<UserAction> mysqlDataStream = environment.addSource(new MySQLSource());

        // 从Oracle读取数据
        DataStream<UserAction> oracleDataStream = environment.addSource(new OracleSource());
        // 合并两个数据流
        DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);

        // 清洗和转换数据
        DataStream<UserAction> transformedStream = mergedStream.map((MapFunction<UserAction, UserAction>) value -> {
            // 进行清洗和转换
            value.setAction(value.getAction().toUpperCase());
            log.info("处理完成的数据是:{}", JSON.toJSONString(value));
            String format = DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT);
            value.setTimestamp(format);
            value.setAction(format + value.getAction());
            return value;
        });

        // 将数据写入目标MySQL数据库
//        transformedStream.addSink(new MysqlSink());

        //keyBy 分组，根据数据相关特性分成一组
//        transformedStream.keyBy(value -> {
//            int id = value.getId();
//            return id;
//        }).flatMap(new MySQLSink()).print();

        transformedStream.keyBy(value -> {
            int id = value.getId();
            return id;
        }).sum("userId").print();

//        transformedStream.

        // 执行任务
        environment.execute("Flink ETL Job");

    }

    public static class MySQLSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
        private static final String JDBC_USER = "root";
        private static final String JDBC_PASSWORD = "Admin123!";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
//                while (isRunning) {
                    String sql = "SELECT * FROM source_mysql_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getInt("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流，每5秒查询一次
//                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }


    public static class OracleSource implements SourceFunction<UserAction> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
        private static final String JDBC_USER = "root";
        private static final String JDBC_PASSWORD = "Admin123!";
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<UserAction> ctx) throws Exception {
            try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
//                while (isRunning) {
                    String sql = "SELECT * FROM source_oracle_table";
                    try (PreparedStatement statement = connection.prepareStatement(sql);
                         ResultSet resultSet = statement.executeQuery()) {
                        while (resultSet.next()) {
                            UserAction userAction = new UserAction();
                            userAction.setId(resultSet.getInt("id"));
                            userAction.setUserId(resultSet.getInt("user_id"));
                            userAction.setAction(resultSet.getString("action"));
                            userAction.setTimestamp(resultSet.getString("timestamp"));
                            ctx.collect(userAction);
                        }
                    }
                    Thread.sleep(5000); // 模拟实时数据流，每5秒查询一次
//                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }


    public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {
        private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";
        private static final String JDBC_USER = "root";
        private static final String JDBC_PASSWORD = "Admin123!";
        private transient Connection connection;
        private transient PreparedStatement statement;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
            String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
            statement = connection.prepareStatement(sql);
        }

        @Override
        public void flatMap(UserAction value, Collector<Void> out) throws Exception {
            statement.setInt(1, value.getUserId());
            statement.setString(2, value.getAction());
            statement.setString(3, value.getTimestamp());
            statement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (statement != null) {
                statement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}
